/*
 * Copyright (C) 2018 The Asiainfo-Ability Authors
 *
 *      Licensed under the Apache License, Version 2.0 (the "License");
 *      you may not use this file except in compliance with the License.
 *      You may obtain a copy of the License at
 *
 *          http://www.apache.org/licenses/LICENSE-2.0
 *
 *      Unless required by applicable law or agreed to in writing, software
 *      distributed under the License is distributed on an "AS IS" BASIS,
 *      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *      See the License for the specific language governing permissions and
 *      limitations under the License.
 */

package org.asiainfo.ability.provider.configuration;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.asiainfo.ability.base.queue.OperEvent;
import org.asiainfo.ability.base.queue.QueueObserver;
import org.asiainfo.ability.base.utils.SerializationUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * @Author: visen
 * @Date: 2018/1/16
 * @Description:
 *
 */
@Configuration
public class OperConfig {


    private static final Logger logger = LogManager.getLogger(OperConfig.class);
    public static final String topicName = "queue-channel";
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter)
    {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(topicName));
        return container;
    }

    @Bean
    QueueObserver queueObserver(){
        return new QueueObserver();
    }
    @Bean
    MessageListenerAdapter listenerAdapter(QueueObserver queueService) {
        return new MessageListenerAdapter(new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                OperEvent event = SerializationUtils.deserializeNative(message.getBody(), OperEvent.class);
                queueService.publishEvent(event);
            }
        });
    }
}
